<!DOCTYPE html>
<html>

<head>

    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta name="renderer" content="webkit">
    <meta http-equiv="Cache-Control" content="no-siteapp"/>
    <title>数据管理 | 配置文档 | 数据管理系统</title>

    <meta name="keywords" content="数据管理 | 配置文档 | 数据管理系统">
    <meta name="description" content="数据管理 | 配置文档 | 数据管理系统">

    <!-- 全局CSS -->
    <script src="/js/meta.js"></script>
    <script type="text/javascript">
        BASE_CSS();
    </script>
    <link rel="stylesheet" href="/js/plugins/highlight/default.css">
    <link rel="stylesheet" href="/js/plugins/highlight/github-gist.css">
    <style>
        li {
            padding-top: 2px;
            padding-bottom: 3px;
        }

        .plugin-box {
            margin-bottom: 20px;
        }

        pre {
            text-align: left;
        }
    </style>
</head>

<body class="gray-bg">
<div class="wrapper wrapper-content animated fadeInRight">
    <div class="row">
        <div class="col-sm-12">
            <div class="ibox float-e-margins">
                <div class="ibox-title">
                    <h3>数据管理配置文档</h3>
                </div>
                <div class="ibox-content">
                    <div class="row">
                        <div class="col-sm-2">
                            <fieldset style="position: static;top: 80px;">
                                <legend style="font-size: 16px;">数据抽取</legend>
                                <ul >
                                    <li><a href="#plugin-sm">插件配置说明</a></li>
                                    <li><a href="#ConsoleWriter">Console</a>
                                        <ul>
                                            <li><a href="#ConsoleWriter">ConsoleWriter</a></li>
                                        </ul>
                                    </li>
                                    <li><a href="#MySqlReader">Database</a>
                                        <ul>
                                            <li><a href="#MySqlReader">MySql</a>
                                                <ul>
                                                    <li><a href="#MySqlReader">MySqlReader</a></li>
                                                    <li><a href="#MySqlWriter">MySqlWriter</a></li>
                                                </ul>
                                            </li>
                                            <li><a href="#OracleReader">Oracle</a>
                                                <ul>
                                                    <li><a href="#OracleReader">OracleReader</a></li>
                                                    <li><a href="#OracleWriter">OracleWriter</a></li>
                                                </ul>
                                            </li>
                                            <li><a href="#SqlServerReader">SqlServer</a>
                                            <ul>
                                                <li><a href="#SqlServerReader">SqlServerReader</a></li>
                                                <li><a href="#SqlServerWriter">SqlServerWriter</a></li>
                                            </ul>
                                        </li>
                                        </ul>
                                    </li>
                                    <li><a href="#ElasticSearchReader">ElasticSearch</a>
                                        <ul>
                                            <li><a href="#ElasticSearchReader">ElasticSearchReader</a></li>
                                            <li><a href="#ElasticSearchWriter">ElasticSearchWriter</a></li>
                                        </ul>
                                    </li>
                                    <li><a href="#TxtReader">TXT</a>
                                        <ul>
                                            <li><a href="#TxtReader">TxtReader</a></li>
                                            <li><a href="#TxtWriter">TxtWriter</a></li>
                                        </ul>
                                    </li>
                                    <li><a href="#FtpTxtReader">FTP</a>
                                        <ul>
                                            <li><a href="#FtpTxtReader">FtpTxtReader</a></li>
                                            <li><a href="#FtpTxtWriter">FtpTxtWriter</a></li>
                                        </ul>
                                    </li>
                                    <li><a href="#Reader">插件开发</a></li>
                                        <ul>
                                            <li><a href="#Reader">Reader 插件开发</a></li>
                                            <li><a href="#Writer">Writer 插件开发</a></li>
                                        </ul>
                                    </li>
                                </ul>
                            </fieldset>
                            <fieldset style="position: static;top: 80px;">
                                <legend style="font-size: 16px;">任务调度</legend>
                                <ul >
                                    <li><a href="#task-sm">任务配置说明</a></li>
                                    <li><a href="#task">任务开发</a></li>
                                </ul>
                            </fieldset>
                        </div>
                        <div class="col-sm-9">
                            <fieldset class="plugin-box" id="plugin-sm">
                                <legend>插件配置说明</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>插件必须包含两个部分：<code>reader</code>、<code>writer</code>。这两部分配置的配置项会被当做参数传入<code>name</code>配置中指定的插件类。插件类中可通过<code>init(Map params)</code>方法初始化，也可通过<code>getParams()</code>方法获取到。</p>
                                            <p><code>reader</code>和<code>writer</code>中参数<code>name</code>为必填项。</p>
                                            <p>各个<code>reader</code>和<code>writer</code>插件可进行组合，并可根据业务需求进行<a href="#Reader">插件开发</a></p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
        "reader":{
            "name":"com.anluy.datapig.plugin.database.oracle.OracleReader",
            "url":"jdbc:oracle:thin:@68.64.9.188:1521:jzdb",
            "username":"jzck",
            "password":"jzck",
            "sql":"select * from zjjk_jyls_20180907 t"
        },
        "writer":{
            "name":"com.anluy.datapig.plugin.database.mysql.MySqlWriter",
            "url":"jdbc:mysql://68.64.8.82:3306/data-pig?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull",
            "username":"root",
            "password":"xinghuo",
            "tableName":"zjjk_jyls",
            "batchSize":"500"
        }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="ConsoleWriter">
                                <legend>com.anluy.datapig.plugin.console.ConsoleWriter</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>输出 Console 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.console.ConsoleWriter</code>，必填项。</p>
                                            <p>参数<code>format</code>：数据格式化选项。
                                            <p>&emsp;&emsp;参数<code>date</code>：对指定字段的数据进行时间格式化，转换为字符串格式，可填项。</p>
                                            <p>&emsp;&emsp;目前只实现了以上一个格式化方法。</p>
                                            </p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
        "writer":{
            "name":"com.anluy.datapig.plugin.console.ConsoleWriter",
            "format":{
                "create_time":{"date":"yyyy-MM-dd HH:mm:ss"}
            }
       }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="MySqlReader">
                                <legend>com.anluy.datapig.plugin.database.mysql.MySqlReader</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>读取 MySql 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.database.mysql.MySqlReader</code>，必填项。</p>
                                            <p>参数<code>url</code>：值为数据库的JDBC连接串，必填项。</p>
                                            <p>参数<code>username</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>password</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>encoding</code>：值为读取数据库字符字段的编码设置，可填项。</p>
                                            <p>参数<code>sql</code>：值为查询数据的SQL串，必填项。</p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建数据库连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
        "reader":{
           "name":"com.anluy.datapig.plugin.database.mysql.MySqlReader",
           "url":"jdbc:mysql://68.64.9.205:3307/p2p_merge?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&tcpRcvBuf=2048000",
           "username":"root",
           "password":"root_jzzd",
           "encoding":"UTF-8",
           "sql":"select * from p2p_onlineloan_transaction_copy t",
           "format":{
               "createtime":{"date":"yyyy-MM-dd HH:mm:ss"},
               "updatetime":{"date":"yyyy-MM-dd HH:mm:ss"}
           }
       }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="MySqlWriter">
                                <legend>com.anluy.datapig.plugin.database.mysql.MySqlWriter</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>写入 MySql 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.database.mysql.MySqlWriter</code>，必填项。</p>
                                            <p>参数<code>url</code>：值为数据库的JDBC连接串，必填项。</p>
                                            <p>参数<code>username</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>password</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>
                                                参数<code>tableName</code>：需要插入数据的表名，必填项。这张表必须存在，否则会报错，插入的<code>INSERT</code>语句是自动根据读取插件读取到的字段名生成。
                                            </p>
                                            <p>参数<code>batchSize</code>：每次批量提交的记录数，必填项。</p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建数据库连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                            <p><span style="color: red;">&bigstar; 注意：</span>数据插入不会判断数据是否存在，有可能会存在主键冲突。如需判断需自定义插件。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
        "writer":{
            "name":"com.anluy.datapig.plugin.database.mysql.MySqlWriter",
            "url":"jdbc:mysql://68.64.8.82:3306/data-pig?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull",
            "username":"root",
            "password":"xinghuo",
            "tableName":"p2p_onlineloan_transaction_copy",
            "batchSize":"500"
        }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="OracleReader">
                                <legend>com.anluy.datapig.plugin.database.oracle.OracleReader</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>读取 Oracle 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.database.oracle.OracleReader</code>，必填项。</p>
                                            <p>参数<code>url</code>：值为数据库的JDBC连接串，必填项。</p>
                                            <p>参数<code>username</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>password</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>encoding</code>：值为读取数据库字符字段的编码设置，可填项。</p>
                                            <p>参数<code>sql</code>：值为查询数据的SQL串，必填项。</p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建数据库连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
       "reader":{
           "name":"com.anluy.datapig.plugin.database.oracle.OracleReader",
           "url":"jdbc:oracle:thin:@68.64.9.188:1521:jzdb",
           "username":"jzck",
           "password":"jzck",
           "encoding":"UTF-8",
           "sql":"select * from zjjk_jyls_20180907 t"
       },
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="OracleWriter">
                                <legend>com.anluy.datapig.plugin.database.oracle.OracleWriter</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>写入 Oracle 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.database.oracle.OracleWriter</code>，必填项。</p>
                                            <p>参数<code>url</code>：值为数据库的JDBC连接串，必填项。</p>
                                            <p>参数<code>username</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>password</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>
                                                参数<code>tableName</code>：需要插入数据的表名，必填项。这张表必须存在，否则会报错，插入的<code>INSERT</code>语句是自动根据读取插件读取到的字段名生成。
                                            </p>
                                            <p>参数<code>batchSize</code>：每次批量提交的记录数，必填项。</p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建数据库连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                            <p><span style="color: red;">&bigstar; 注意：</span>数据插入不会判断数据是否存在，有可能会存在主键冲突。如需判断需自定义插件。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
       "writer":{
           "name":"com.anluy.datapig.plugin.database.oracle.OracleWriter",
           "url":"jdbc:oracle:thin:@68.64.9.188:1521:jzdb",
           "username":"jzck",
           "password":"jzck",
           "tableName":"zjjk_jyls",
           "batchSize":"500"
       }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="SqlServerReader">
                                <legend>com.anluy.datapig.plugin.database.sqlserver.SqlServerReader</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>读取 Oracle 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.database.sqlserver.SqlServerReader</code>，必填项。</p>
                                            <p>参数<code>url</code>：值为数据库的JDBC连接串，必填项。</p>
                                            <p>参数<code>username</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>password</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>encoding</code>：值为读取数据库字符字段的编码设置，可填项。</p>
                                            <p>参数<code>sql</code>：值为查询数据的SQL串，必填项。</p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建数据库连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
       "reader":{
           "name":"com.anluy.datapig.plugin.database.sqlserver.SqlServerReader",
           "url":"jdbc:sqlserver://68.64.9.188:1443;DatabaseName=jzdb",
           "username":"jzzd",
           "password":"111111",
           "encoding":"UTF-8",
           "sql":"select * from Fa_BalHistory t"
       },
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="SqlServerWriter">
                                <legend>com.anluy.datapig.plugin.database.sqlserver.SqlServerWriter</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>写入 Oracle 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.database.sqlserver.SqlServerWriter</code>，必填项。</p>
                                            <p>参数<code>url</code>：值为数据库的JDBC连接串，必填项。</p>
                                            <p>参数<code>username</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>参数<code>password</code>：值为数据库登陆用户名，必填项。</p>
                                            <p>
                                                参数<code>tableName</code>：需要插入数据的表名，必填项。这张表必须存在，否则会报错，插入的<code>INSERT</code>语句是自动根据读取插件读取到的字段名生成。
                                            </p>
                                            <p>参数<code>batchSize</code>：每次批量提交的记录数，必填项。</p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建数据库连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                            <p><span style="color: red;">&bigstar; 注意：</span>数据插入不会判断数据是否存在，有可能会存在主键冲突。如需判断需自定义插件。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
       "writer":{
           "name":"com.anluy.datapig.plugin.database.sqlserver.SqlServerWriter",
           "url":"jdbc:sqlserver://68.64.9.188:1443;DatabaseName=jzdb",
           "username":"jzzd",
           "password":"111111",
           "tableName":"zjjk_jyls",
           "batchSize":"500"
       }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="ElasticSearchReader">
                                <legend>com.anluy.datapig.plugin.elasticsearch.ElasticSearchReader</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>读取 ElasticSearch 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.elasticsearch.ElasticSearchReader</code>，必填项。</p>
                                            <p>参数<code>host</code>：值为ElasticSearch的连接串，多个地址用<code>,</code>分隔，必填项。</p>
                                            <p>参数<code>username</code>：值为ElasticSearch登陆用户名，必填项。如果ElasticSearch没有配置用户验证，随便输入一串字符即可。</p>
                                            <p>参数<code>password</code>：值为ElasticSearch登陆用户名，必填项。如果ElasticSearch没有配置用户验证，随便输入一串字符即可。</p>
                                            <p>参数<code>indexName</code>：需要插入数据的索引名，必填项。这个索引在ElasticSearch中必须存在，否则会报错。</p>
                                            <p>参数<code>typeName</code>：需要插入数据的type名，必填项。保存数据的<code>json</code>或<code>sql</code>的字段名是自动根据读取到的字段名生成。</p>
                                            <p>参数<code>dsl</code>：查询ElasticSearch的DSL语句，可填项，不填时自动生成<code>{"size":500,"query":{"match_all":{}}}</code>。</p>
                                            <p>参数<code>prefix</code>：将ElasticSearch的字段名进行前缀<code>删除</code>，可填项。</p>
                                            <p>参数<code>mapping</code>：将ElasticSearch的字段名进行映射，可填项。</p>
                                            <p>参数<code>format</code>：数据格式化选项。
                                            <p>&emsp;&emsp;参数<code>date</code>：对指定字段的数据进行时间格式化，转换为<code>java.util.Date</code>格式，可填项。</p>
                                            <p>&emsp;&emsp;参数<code>join</code>：对指定字段的数据（数组）进行拼接，转换为字符串格式，可填项。</p>
                                            <p>&emsp;&emsp;目前只实现了以上两个格式化方法。</p>
                                            </p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建ElasticSearch连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
   "plugin":{
       "reader":{
           "name":"com.anluy.datapig.plugin.elasticsearch.ElasticSearchReader",
           "host":"68.64.9.174:9602,68.64.9.176:9602,68.64.9.178:9602",
           "username":"szhzgk",
           "password":"szhzgk.com@402",
           "indexName":"test2",
           "typeName":"p2p_transaction_copy",
           "dsl":"",
           "prefix":"",
           "mapping":{
               "createtime":"create_time",
               "updatetime":"update_time",
               "unrepayinterest":"unrepay_interest",
               "hukouaddr":"hukou_addr",
               "nowaddr":"now_addr",
               "repaycapital":"repay_capital",
               "repayinterest":"repay_interest",
               "unrepaycapital":"unrepay_capital",
               "isemphasis":"is_emphasis",
               "emphasisrank":"emphasis_rank"
            },
           "format":{
               "createtime":{"date":"yyyy-MM-dd HH:mm:ss"},
               "updatetime":{"date":"yyyy-MM-dd HH:mm:ss"},
               "transaction_time":{"date":"yyyy-MM-dd HH:mm:ss"},
               "transaction_description":{"join":" "}
            }
       }
   }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="ElasticSearchWriter">
                                <legend>com.anluy.datapig.plugin.elasticsearch.ElasticSearchWriter</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>写入 ElasticSearch 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.elasticsearch.ElasticSearchWriter</code>，必填项。</p>
                                            <p>参数<code>host</code>：值为ElasticSearch的连接串，多个地址用<code>,</code>分隔，必填项。</p>
                                            <p>参数<code>username</code>：值为ElasticSearch登陆用户名，必填项。如果ElasticSearch没有配置用户验证，随便输入一串字符即可。</p>
                                            <p>参数<code>password</code>：值为ElasticSearch登陆用户名，必填项。如果ElasticSearch没有配置用户验证，随便输入一串字符即可。</p>
                                            <p>参数<code>indexName</code>：需要索引数据的索引名，必填项。这个索引在ElasticSearch中必须存在，否则会报错。</p>
                                            <p>参数<code>typeName</code>：需要索引数据的type名，必填项。这个type必须存在，否则可能会报错，索引数据的<code>json</code>字段名是自动根据读取插件读取到的字段名生成。</p>
                                            <p>参数<code>prefix</code>：将ElasticSearch的字段名进行前缀<code>添加</code>，可填项。</p>
                                            <p>参数<code>batchSize</code>：每次批量提交的记录数，必填项。</p>
                                            <p>参数<code>mapping</code>：将ElasticSearch的字段名进行映射，可填项。</p>
                                            <p>参数<code>format</code>：数据格式化选项。
                                                <p>&emsp;&emsp;参数<code>date</code>：对指定字段的数据进行时间格式化，转换为字符串格式，可填项。</p>
                                                <p>&emsp;&emsp;参数<code>split</code>：对指定字段的数据进行分割，转换为数组格式，可填项。</p>
                                                <p>&emsp;&emsp;目前只实现了以上两个格式化方法。</p>
                                            </p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建ElasticSearch连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
        "writer":{
            "name":"com.anluy.datapig.plugin.elasticsearch.ElasticSearchWriter",
            "host":"68.64.9.174:9602,68.64.9.176:9602,68.64.9.178:9602",
            "username":"szhzgk",
            "password":"szhzgk.com@402",
            "indexName":"test2",
            "typeName":"p2p_transaction_copy",
            "prefix":"",
            "batchSize":"500",
            "mapping":{
               "createtime":"create_time",
               "updatetime":"update_time"
            },
            "format":{
                "create_time":{"date":"yyyy-MM-dd HH:mm:ss"},
                "update_time":{"date":"yyyy-MM-dd HH:mm:ss"},
                "transaction_time":{"date":"yyyy-MM-dd HH:mm:ss"},
                "transaction_description":{"split":" "}
            }
        }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="TxtReader">
                                <legend>com.anluy.datapig.plugin.txt.TxtReader</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>读取 Txt文件 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.txt.TxtReader</code>，必填项。</p>
                                            <p>参数<code>filePath</code>：值为文件全路径，<code>filePath</code>、<code>fileDir</code>二填一，同时填写以<code>filePath</code>为准。</p>
                                            <p>参数<code>fileDir</code>：值为文件目录全路径，必填项。</p>
                                            <p>参数<code>prefix</code>：值为文件前缀，可填项。搭配<code>fileDir</code>获取文件列表使用。</p>
                                            <p>参数<code>suffix</code>：值为文件后缀，可填项。搭配<code>fileDir</code>获取文件列表使用。</p>
                                            <p>参数<code>separator</code>：值为文件中的数据的列分隔符，不填默认为<code>,</code>，可填项。</p>
                                            <p>参数<code>encoding</code>：值为读取文件数据的编码格式，不填默认为<code>UTF-8</code>，可填项</p>
                                            <p>参数<code>format</code>：数据格式化选项。
                                            <p>&emsp;&emsp;参数<code>date</code>：对指定字段的数据进行时间格式化，转换为<code>java.util.Date</code>格式，可填项。</p>
                                            <p>&emsp;&emsp;目前只实现了以上一个格式化方法。</p>
                                            </p>
                                            <p>以上参数在程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
   "plugin":{
       "reader":{
           "name":"com.anluy.datapig.plugin.txt.TxtReader",
           "filePath":"F:/feiq/feiq/AutoRecv Files/任小龙(70208404CD7E)/1.补充材料/春天金融/日报/91440300342941628F_20181010_p2p_d_j1001.txt",
           "fileDir":"",
           "prefix":"",
           "suffix":"",
           "separator":"",
           "encoding":"UTF-8",
           "format":{
                "create_time":{"date":"yyyy-MM-dd HH:mm:ss"}
           }
       }
   }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="TxtWriter">
                                <legend>com.anluy.datapig.plugin.txt.TxtWriter</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>写入 Txt 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.txt.TxtWriter</code>，必填项。</p>
                                            <p>参数<code>filePath</code>：值为文件全路径，必填项。</p>
                                            <p>参数<code>separator</code>：值为文件中的数据的列分隔符，不填默认为<code>,</code>，可填项。</p>
                                            <p>参数<code>encoding</code>：值为读取文件数据的编码格式，不填默认为<code>UTF-8</code>，可填项</p>
                                            <p>参数<code>format</code>：数据格式化选项。
                                            <p>&emsp;&emsp;参数<code>date</code>：对指定字段的数据进行时间格式化，转换为<code>java.lang.String</code>格式，可填项。</p>
                                            <p>&emsp;&emsp;目前只实现了以上一个格式化方法。</p>
                                            </p>
                                            <p>以上参数在程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
        "writer":{
           "name":"com.anluy.datapig.plugin.txt.TxtWriter",
           "filePath":"H:/91440300342941628F_20181010_p2p_d_j1001.txt",
           "separator":"",
           "encoding":"UTF-8",
           "format":{
               "create_time":{"date":"yyyy-MM-dd HH:mm:ss"}
           }
        }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="FtpTxtReader">
                                <legend>com.anluy.datapig.plugin.ftp.FtpTxtReader</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>读取 Ftp Txt文件 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.ftp.FtpTxtReader</code>，必填项。</p>
                                            <p>参数<code>host</code>：值为FTP的IP地址，必填项。</p>
                                            <p>参数<code>port</code>：值为FTP的端口号，可填项。</p>
                                            <p>参数<code>userName</code>：值为FTP的登录用户名，必填项。</p>
                                            <p>参数<code>password</code>：值为FTP的登录密码，必填项。</p>
                                            <p>参数<code>filePath</code>：值为文件全路径，<code>filePath</code>、<code>fileDir</code>二填一，同时填写以<code>filePath</code>为准。</p>
                                            <p>参数<code>fileDir</code>：值为文件目录全路径，必填项。</p>
                                            <p>参数<code>prefix</code>：值为文件前缀，可填项。搭配<code>fileDir</code>获取文件列表使用。</p>
                                            <p>参数<code>suffix</code>：值为文件后缀，可填项。搭配<code>fileDir</code>获取文件列表使用。</p>
                                            <p>参数<code>separator</code>：值为文件中的数据的列分隔符，不填默认为<code>,</code>，可填项。</p>
                                            <p>参数<code>encoding</code>：值为读取文件数据的编码格式，不填默认为<code>UTF-8</code>，可填项</p>
                                            <p>参数<code>format</code>：数据格式化选项。
                                            <p>&emsp;&emsp;参数<code>date</code>：对指定字段的数据进行时间格式化，转换为<code>java.util.Date</code>格式，可填项。</p>
                                            <p>&emsp;&emsp;目前只实现了以上一个格式化方法。</p>
                                            </p>
                                            <p>以上参数在程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
   "plugin":{
       "reader":{
           "name":"com.anluy.datapig.plugin.ftp.FtpTxtReader",
           "host":"68.64.8.82",
           "port":"21",
           "userName":"administrator",
           "password":"xinghuo",
           "filePath":"/data-pig/91440300342941628F_20181010_p2p_d_j1003.txt",
           "fileDir":"",
           "prefix":"",
           "suffix":"",
           "separator":"",
           "encoding":"UTF-8",
           "format":{
                "create_time":{"date":"yyyy-MM-dd HH:mm:ss"}
           }
       }
   }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="FtpTxtWriter">
                                <legend>com.anluy.datapig.plugin.ftp.FtpTxtWriter</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>写入 Ftp Txt 数据插件：</p>
                                            <p>参数<code>name</code>：值为固定的<code>com.anluy.datapig.plugin.ftp.FtpTxtWriter</code>，必填项。</p>
                                            <p>参数<code>host</code>：值为FTP的IP地址，必填项。</p>
                                            <p>参数<code>port</code>：值为FTP的端口号，可填项。</p>
                                            <p>参数<code>userName</code>：值为FTP的登录用户名，必填项。</p>
                                            <p>参数<code>password</code>：值为FTP的登录密码，必填项。</p>
                                            <p>参数<code>filePath</code>：值为文件全路径，必填项。</p>
                                            <p>参数<code>separator</code>：值为文件中的数据的列分隔符，不填默认为<code>,</code>，可填项。</p>
                                            <p>参数<code>encoding</code>：值为读取文件数据的编码格式，不填默认为<code>UTF-8</code>，可填项</p>
                                            <p>参数<code>format</code>：数据格式化选项。
                                            <p>&emsp;&emsp;参数<code>date</code>：对指定字段的数据进行时间格式化，转换为<code>java.lang.String</code>格式，可填项。</p>
                                            <p>&emsp;&emsp;目前只实现了以上一个格式化方法。</p>
                                            </p>
                                            <p>以上参数在程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="json">
{
    "plugin":{
        "writer":{
           "name":"com.anluy.datapig.plugin.ftp.FtpTxtWriter",
           "host":"68.64.8.82",
           "port":"21",
           "userName":"administrator",
           "password":"xinghuo",
           "filePath":"/data-pig/91440300342941628F_20181010_p2p_d_j1003.txt",
           "separator":"",
           "encoding":"UTF-8",
           "format":{
               "create_time":{"date":"yyyy-MM-dd HH:mm:ss"}
           }
        }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="Reader">
                                <legend>Reader 插件开发</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>Reader类必须继承<code>com.anluy.datapig.plugin.core.Reader</code>实现<code>call</code>方法并调用<code>this.execute();</code>。</p>
                                            <p>Task任务实现类，在Reader类中创建一个内部类并继承<code>com.anluy.datapig.plugin.core.Reader.Task</code>实现<code>call</code>方法并调用任务的具体实现。</p>
                                            <p>实现<code>init(Map params)</code>方法，并在方法中初始化必要的参数和校验。</p>
                                            <p>实现<code>start()</code>方法，方法中初始化任务对象，并开始调用任务。</p>
                                            <p>实现<code>end()</code>方法，方法为当程序执行完成后的操作，如调用<code>shutdown()</code>关闭连接等。</p>
                                            <p>实现<code>shutdown()</code>方法，方法中关闭必要的连接，如数据库连接、线程池等等。</p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建数据库连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="java">
import com.anluy.datapig.job.service.JobManager;
import com.anluy.datapig.plugin.core.DataPigException;
import com.anluy.datapig.plugin.core.Reader;
import com.anluy.datapig.plugin.database.DataBaseType;
import com.anluy.datapig.plugin.element.*;
import com.anluy.datapig.plugin.exchanger.RecordExchanger;
import com.anluy.datapig.plugin.exchanger.RecordSender;
import com.anluy.datapig.plugin.utils.DBUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
 * 数据库读取插件
 *
 * @author hc.zeng
 * @create 2018-10-10 16:15
 */
public class DataBaseReader extends Reader {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchReader.class);
    protected final byte[] EMPTY_CHAR_ARRAY = new byte[0];
    private DataBaseType dataBase;
    private Connection connection;
    private volatile boolean shutdown = false;
    private String encoding;
    private String sql;
    private ExecutorService exec;

    public DataBaseReader(JobManager jobManager, Map params, RecordExchanger recordExchanger) {
        super(jobManager, params, recordExchanger);
    }

    @Override
    public Boolean call() throws Exception {
        this.execute();
        return true;
    }

    /**
     * 初始化参数配置
     * @param params
     * @return
     */
    @Override
    public Object init(Map params) {
        String url = (String) params.get("url");
        String username = (String) params.get("username");
        String password = (String) params.get("password");
        //校验必选参数
        if (StringUtils.isBlank(url)) {
            throw new DataPigException("DataBaseReader Plugin : url is null!");
        }
        if (StringUtils.isBlank(username)) {
            throw new DataPigException("DataBaseReader Plugin : username is null!");
        }
        if (StringUtils.isBlank(password)) {
            throw new DataPigException("DataBaseReader Plugin : password is null!");
        }
        //创建数据库连接
        try {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("get connection start ");
            }
            connection = DBUtil.getConnection(dataBase, url, username, password);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("get connection OK ");
            }
            log("连接数据库完成");
        } catch (Exception e) {
            throw new DataPigException(e);
        }

        return null;
    }

    @Override
    public Object start() {
        //新建任务对象，并执行任务
        ElasticSearchReader.Task task = new ElasticSearchReader.Task();
        task.reader();
          //创建线程池的方式启动任务，适合多线程读取。
//        exec = Executor.executorService;
//        Future&ltBoolean&gt result = exec.submit(task);
//        try {
//            result.get();
//        } catch (ExecutionException e) {
//            throw new DataPigException(e);
//        } catch (InterruptedException e) {
//            Thread.currentThread().interrupt();
//        }
        return null;
    }

    @Override
    public Object shutdown() {
        shutdown = true;
        return null;
    }

    @Override
    public Object end() {
        shutdown();
        return null;
    }

    /**
     * 任务实现
     */
    public class Task extends Reader.Task {

        @Override
        public Boolean call() throws Exception {
            reader();
            return true;
        }
       /**
        * 读取数据
        */
        @Override
        protected void reader() {
            ResultSet resultSet = null;
            try {
                Statement statement = connection.createStatement();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("start query sql => " + sql);
                }
                resultSet = statement.executeQuery(sql);
                ResultSetMetaData metaData = resultSet.getMetaData();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("query ok，sender data");
                }
                log("读取数据SQL:" + sql);
                //开始读取数据
                long time = System.currentTimeMillis();
                int pcs = 0;
                int mite = 1;
                while (!shutdown && resultSet.next()) {
                    //转换数据并往通道中写入
                    this.transportOneRecord(getRecordExchanger(), resultSet, metaData);
                    long time2 = System.currentTimeMillis();
                    pcs++;
                    //每分钟记录一次日志信息
                    if ((time2 - time) > 60000 * mite) {
                        log("读取数据" + pcs + "条");
                        mite++;
                    }
                }
                //往通道中写入一个完成标记的数据行
                getRecordExchanger().terminate();
                log("读取数据完成，共" + pcs + "条");
            } catch (SQLException e) {
                LOGGER.error("read database fail :" + e.getMessage(), e);
                getRecordExchanger().shutdown();
                log("读取数据发生异常:" + e.getMessage());
                throw new DataPigException(e);
            } finally {
                if (resultSet != null) {
                    try {
                        resultSet.close();
                    } catch (SQLException e) {
                        LOGGER.error("close resultSet fail :" + e.getMessage(), e);
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        LOGGER.error("close connection fail :" + e.getMessage(), e);
                    }
                }
            }
        }
        /**
         * 转换数据
         * @param recordSender
         * @param resultSet
         * @param metaData
         * @return
         */
        protected Record transportOneRecord(RecordSender recordSender, ResultSet resultSet, ResultSetMetaData metaData) {
            Record record = buildRecord(recordSender, resultSet, metaData);
            //往通道中写入一条记录
            recordSender.sendToWriter(record);
            return record;
        }
        /**
         * 创建一行数据对象,并转换数据格式
         * @param recordSender
         * @param rs
         * @param metaData
         * @return
         */
        protected Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData) {
             //...........
        }
    }
}
                                            </pre>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="Writer">
                                <legend>Writer 插件开发</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>Writer类必须继承<code>com.anluy.datapig.plugin.core.Writer</code>实现<code>call</code>方法并调用<code>this.execute();</code>。</p>
                                            <p>Task任务实现类，在Writer类中创建一个内部类并继承<code>com.anluy.datapig.plugin.core.Writer.Task</code>实现<code>call</code>方法并调用任务的具体实现。</p>
                                            <p>实现<code>init(Map params)</code>方法，并在方法中初始化必要的参数和校验。</p>
                                            <p>实现<code>start()</code>方法，方法中初始化任务对象，并开始调用任务。</p>
                                            <p>实现<code>end()</code>方法，方法为当程序执行完成后的操作，如调用<code>shutdown()</code>关闭连接等。</p>
                                            <p>实现<code>shutdown()</code>方法，方法中关闭必要的连接，如数据库连接、线程池等等。</p>
                                            <p>以上参数为插件类中通过<code>init(Map params)</code>方法初始化创建数据库连接使用，程序中可通过<code>getParams()</code>方法获取到。</p>
                                        </div>
                                        <pre class="java">
import com.anluy.datapig.job.service.JobManager;
import com.anluy.datapig.plugin.core.DataPigException;
import com.anluy.datapig.plugin.core.Wirter;
import com.anluy.datapig.plugin.element.Record;
import com.anluy.datapig.plugin.element.TerminateRecord;
import com.anluy.datapig.plugin.exchanger.RecordExchanger;
import com.anluy.datapig.plugin.utils.DBUtil;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据库写入插件
 *
 * @author hc.zeng
 * @create 2018-10-10 16:30
 */

public abstract class DataBaseWriter extends Wirter {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataBaseWriter.class);
    private DataBaseType dataBase;
    private int batchSize = 500;
    private volatile boolean shutdown = false;
    private ExecutorService exec;
    private String url;
    private String username;
    private String password;
    private String tableName;
    private AtomicLong atomicLong = new AtomicLong();

    public DataBaseWriter(JobManager jobManager, Map params, RecordExchanger recordExchanger) {
        super(jobManager, params, recordExchanger);
    }

    @Override
    public Boolean call() throws Exception {
        this.execute();
        return true;
    }

    /**
     * 初始化参数
     *
     * @param params
     * @return
     */
    @Override
    public Object init(Map params) {
        url = (String) params.get("url");
        username = (String) params.get("username");
        password = (String) params.get("password");
        String batchSize = (String) params.get("batchSize");
        tableName = (String) params.get("tableName");
        //校验必须参数
        if (StringUtils.isBlank(url)) {
            throw new DataPigException("DataBaseWriter Plugin : url is null!");
        }
        if (StringUtils.isBlank(username)) {
            throw new DataPigException("DataBaseWriter Plugin : username is null!");
        }
        if (StringUtils.isBlank(password)) {
            throw new DataPigException("DataBaseWriter Plugin : password is null!");
        }
        if (StringUtils.isBlank(tableName)) {
            throw new DataPigException("DataBaseWriter Plugin : tableName is null!");
        }
        if (dataBase == null) {
            throw new DataPigException("DataBaseWriter Plugin : dataBase is null!");
        }
        if (StringUtils.isNotBlank(batchSize)) {
            try {
                this.batchSize = Integer.valueOf(batchSize);
            } catch (Exception e) {
                throw new DataPigException("batchSize not's a number", e);
            }
        }
        return null;
    }


    /**
     * 开始执行
     * @return
     */
    @Override
    public Object start() {
        //创建线程池
        exec = Executor.executorService;
        //创建多个任务，并批量执行。并等等执行结果
        Collection&ltTask&gt taskList = new ArrayList&ltTask&gt();
        for (int i = 0; i < ThreadSize; i++) {
            Task task = new Task();
            taskList.add(task);
        }
        List&ltFuture&ltBoolean&gt&gt results = Lists.newArrayList();
        try {
            results = exec.invokeAll(taskList);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        //获取执行结果
        for (Future&ltBoolean&gt result : results) {
            try {
                result.get();
            } catch (ExecutionException e) {
                //如果线程内部有错误，往上层抛出。
                throw new DataPigException(e);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return null;
    }

    @Override
    public Object shutdown() {
        shutdown = true;
        return null;
    }

    @Override
    public Object end() {
        shutdown();
        return null;
    }

    /**
     * 任务实现
     */
    public class Task extends Wirter.Task {
        private String insertSql;
        private Connection connection;

        @Override
        public Boolean call() throws Exception {
            writer();
            return null;
        }

        /**
         * 任务的具体实现
         */
        @Override
        protected void writer() {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("启动写数据线程");
            }
            try {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("写数据线程获取数据库连接");
                }
                connection = DBUtil.getConnection(dataBase, url, username, password);
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("写数据线程获取数据库连接完成");
                }

                log("连接数据库完成");

                long time = System.currentTimeMillis();
                int mite = 1;
                while (!shutdown) {
                    Record record = null;
                    List&ltRecord&gt dataList = new ArrayList<>();
                    //获取一批数据
                    for (int i = 0; i < batchSize; i++) {
                        //从通道中获取一条记录
                        record = getRecordExchanger().getFromReader();
                        if (record instanceof TerminateRecord) {
                            break;
                        }
                        if (StringUtils.isBlank(insertSql)) {
                            getInsertSql(record);
                            log("生成插入SQL:" + insertSql);
                        }
                        dataList.add(record);
                    }
                    //批量保存
                    save(dataList);

                    //每分钟记录一次日志
                    long time2 = System.currentTimeMillis();
                    if ((time2 - time) > 60000 * mite) {
                        log("总共写入数据" + atomicLong.get() + "条");
                        mite++;
                    }
                    //如果读取到结束标记，将通道关闭，并将当前线程标记为关闭,
                    if (record instanceof TerminateRecord) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("写数据线程获取到结束标记");
                        }
                        //标记通道为关闭，让其他线程也能根据关闭状态一起关闭了
                        getRecordExchanger().shutdown();
                        shutdown = true;
                        break;
                    }
                }

            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
                getRecordExchanger().shutdown();
                log("数据入库发生异常:" + e.getMessage());
                throw new DataPigException(e);
            } finally {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("数据入库完成，总共写入数据" + atomicLong.get() + "条");
                }
                log("数据入库完成，总共写入数据" + atomicLong.get() + "条");
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        LOGGER.error("关闭数据库连接失败", e);
                    }
                }
            }
        }

        /**
         * 生成INSERT 的sql语句
         * @param record
         * @return
         */
        private String getInsertSql(Record record) {
            if (StringUtils.isBlank(insertSql)) {
                StringBuffer sb = new StringBuffer();
                StringBuffer sbCol = new StringBuffer();
                for (int i = 0; i < record.getColumnNumber(); i++) {
                    if (i > 0) {
                        sb.append(",");
                        sbCol.append(",");
                    }
                    sb.append(record.getColumn(i).getColumnName());
                    sbCol.append("?");
                }
                insertSql = "INSERT INTO " + tableName + "(" + sb.toString() + ") VALUES (" + sbCol.toString() + ")";
            }
            return insertSql;
        }

        /**
         * 批量保存数据
         * @param dataList
         * @throws SQLException
         */
        private void save(List&ltRecord&gt dataList) throws SQLException {
            if (dataList.size() > 0) {
                PreparedStatement prepareStatement = connection.prepareStatement(insertSql);
                connection.setAutoCommit(false);
                for (Record record : dataList) {
                    for (int i = 0; i < record.getColumnNumber(); i++) {
                        prepareStatement.setObject(i + 1, record.getColumn(i).asObject());
                    }
                    prepareStatement.addBatch();
                }
                prepareStatement.executeBatch();
                connection.commit();
                prepareStatement.close();
                atomicLong.addAndGet(dataList.size());
                dataList.clear();
                dataList = null;
            }

        }

    }
}
                                        </pre>
                                    </div>
                                </div>
                            </fieldset>

                            <fieldset class="plugin-box" id="task-sm">
                                <legend>任务配置说明</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>任务必须包含参数<code>Bean</code>。</p>
                                            <p><code>Bean</code>指定为<code>Spring Bean</code>的类时必须是在Spring容器中的，否则报错。</p>
                                            <p><code>Bean</code>指定为<code>class全路径</code>时，会先在Spring容器中取，如果没有，则通过反射实例化<code>obj.getConstructor().newInstance()</code>一个对象，请确保有无参构造函数。</p>
                                        </div>
                                    </div>
                                </div>
                            </fieldset>
                            <fieldset class="plugin-box" id="task">
                                <legend>任务开发</legend>
                                <div class="row">
                                    <div class="col-sm-12">
                                        <div>
                                            <p>Task类必须继承<code>com.anluy.datapig.job.task.DataPigTask</code>实现<code>task</code>方法。</p>
                                            <p>实现<code>task()</code>方法，该方法为程序执行入口方法。</p>
                                            <p>实现<code>shutdown()</code>方法，方法中关闭必要的连接，如数据库连接、线程池等等。</p>
                                        </div>
                                        <pre class="java">
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 测试定时任务(演示Demo，可删除)
 * ${DESCRIPTION}
 *
 * @author hc.zeng
 * @create 2018-11-05 17:02
 */
@Component("testTask")
public class TestTask extends DataPigTask{
	private Logger logger = LoggerFactory.getLogger(getClass());

	@Override
	public void task() {
		logger.info(String.format("我是任务[%s]，正在被执行。",this.getJobManager().getScheduleJob().getName()));
	}

	@Override
	public void shutdown() {
		this.setShutdown(true);
	}
}
                                        </pre>
                                    </div>
                                </div>
                            </fieldset>
                        </div>
                    </div>

                </div>
            </div>
        </div>
    </div>
</div>

<!-- 全局js -->
<script type="text/javascript">
    BASE_JS();
</script>
<script src="/js/plugins/highlight/highlight.pack.min.js" data-name="hightlight"></script>
<script type="text/javascript">
    $(function () {
        $('pre').each(function (i, block) {
            hljs.highlightBlock(block);
        });
    });
</script>
</body>

</html>
